1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import rx.Observable.Operator;
19 import rx.*;
20 import rx.annotations.Experimental;
21 import rx.functions.Func1;
22
23
24
25
26
27
28 @Experimental
29 public final class OperatorTakeUntilPredicate<T> implements Operator<T, T> {
30
31 private final class ParentSubscriber extends Subscriber<T> {
32 private final Subscriber<? super T> child;
33 private boolean done = false;
34
35 private ParentSubscriber(Subscriber<? super T> child) {
36 this.child = child;
37 }
38
39 @Override
40 public void onNext(T args) {
41 child.onNext(args);
42
43 boolean stop = false;
44 try {
45 stop = stopPredicate.call(args);
46 } catch (Throwable e) {
47 done = true;
48 child.onError(e);
49 unsubscribe();
50 return;
51 }
52 if (stop) {
53 done = true;
54 child.onCompleted();
55 unsubscribe();
56 }
57 }
58
59 @Override
60 public void onCompleted() {
61 if (!done) {
62 child.onCompleted();
63 }
64 }
65
66 @Override
67 public void onError(Throwable e) {
68 if (!done) {
69 child.onError(e);
70 }
71 }
72 void downstreamRequest(long n) {
73 request(n);
74 }
75 }
76
77 private final Func1<? super T, Boolean> stopPredicate;
78
79 public OperatorTakeUntilPredicate(final Func1<? super T, Boolean> stopPredicate) {
80 this.stopPredicate = stopPredicate;
81 }
82
83 @Override
84 public Subscriber<? super T> call(final Subscriber<? super T> child) {
85 final ParentSubscriber parent = new ParentSubscriber(child);
86 child.add(parent);
87 child.setProducer(new Producer() {
88 @Override
89 public void request(long n) {
90 parent.downstreamRequest(n);
91 }
92 });
93
94 return parent;
95 }
96
97 }